可爱猫+python3+Flask+aiohttp简单搭建微信机器人

您所在的位置:网站首页 flask 源码 可爱猫+python3+Flask+aiohttp简单搭建微信机器人

可爱猫+python3+Flask+aiohttp简单搭建微信机器人

2023-04-09 00:46| 来源: 网络整理| 查看: 265

之前一直在研究QQ机器人,最近发现一个微信机器人框架,在办公中来增加工作效率。

一、需要环境:

可爱猫(http://www.keaimao.com/) python3.8+ flask、aiohttp 微信 for windows 3.4.0.38

1.启动可爱猫、登录微信 2.在可爱猫应用中找到HTTP个人对接版,双击打开

其中 消息回调地址,相当于消息接收地址 调用本地api地址,相当于消息发送地址

用python整个run.py,顺便从隔壁投一个flask使用async的方法

二、核心代码(run.py) import flask import logging from flask import Flask, jsonify, has_request_context, copy_current_request_context, request from functools import wraps from concurrent.futures import Future, ThreadPoolExecutor import time from getmsg import getmsg import asyncio def run_async(func): @wraps(func) def _wrapper(*args, **kwargs): call_result = Future() def _run(): loop = asyncio.new_event_loop() try: result = loop.run_until_complete(func(*args, **kwargs)) except Exception as error: call_result.set_exception(error) else: call_result.set_result(result) finally: loop.close() loop_executor = ThreadPoolExecutor(max_workers=1) if has_request_context(): _run = copy_current_request_context(_run) loop_future = loop_executor.submit(_run) loop_future.result() return call_result.result() return _wrapper app = flask.Flask(__name__) @app.route('/', methods=['POST']) @run_async async def main(): msg,from_name,final_from_name,time_str,from_wxid,final_from_wxid,msg_type = getmsg() #这一块后面可以写插件主要内容 #... return 'ok' if __name__ == '__main__': app.config['JSON_AS_ASCII'] = False log = logging.getLogger('werkzeug') log.disabled = True app.run(host='0.0.0.0', port=8074, debug=True) 三、可爱猫提供的接收与发送方法 1.接收消息(getmsg.py) import flask,time def getmsg(): #取msg allmsg = flask.request.get_data() msg_type =flask.request.form.get('type') if msg_type == '200': msg_type_name = '群聊消息' elif msg_type == '100': msg_type_name = '私聊消息' msg = flask.request.form.get('msg') from_name = flask.request.form.get('from_name').encode('utf-8').decode('utf-8') final_from_name = flask.request.form.get('final_from_name').encode('utf-8').decode('utf-8') time_str = flask.request.form.get('time') time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time_str))) from_wxid = flask.request.form.get('from_wxid') final_from_wxid = flask.request.form.get('final_from_wxid') print(f'\033[1;36m[INFO] [{time_str}]\033[0m [{msg_type_name}(\033[1;34m{from_name}\033[0m)]\033[1;35m {final_from_name} \033[0m: \033[1;32m{msg}\033[0m ') return msg,from_name,final_from_name,time_str,from_wxid,final_from_wxid,msg_type 2.发送消息(httpapi.py)

这里按照可爱猫提供的方法写了三种,基本上已经够用了其中wxid_xxxxx为机器人的wxid

import json import time import aiohttp url = 'http://127.0.0.1:8073/send' async def send_msg_private(msg, to_wxid): data = { "type": "100", "msg": msg, "to_wxid": to_wxid, "robot_wxid":"wxid_xxxxx" } data = json.dumps(data) async with aiohttp.ClientSession() as session: async with session.post(url, data=data) as resp: print(await resp.text()) async def send_msg_group(msg, to_wxid): data = { "type": "100", "msg": msg, "to_wxid": to_wxid, "robot_wxid":"wxid_xxxxx" } data = json.dumps(data) async with aiohttp.ClientSession() as session: async with session.post(url, data=data) as resp: print(await resp.text()) async def send_image_msg(picpath, to_wxid): data = { "type": "106", "msg": picpath, "to_wxid": to_wxid, "robot_wxid":"wxid_xxxxx" } data = json.dumps(data) async with aiohttp.ClientSession() as session: async with session.post(url, data=data) as resp: print(await resp.text()) 四、插件例子(百度文心的AI对对联duilian.py)

插件我放在plugin文件夹下,这个随意

import wenxin_api from wenxin_api.tasks.couplet import Couplet def duilian(str1): wenxin_api.ak = ""#自己申请 wenxin_api.sk = ""#自己申请 input_dict = { "text": f"上联:{str1}\n下联:", "seq_len": 512, "topp": 0.9, "penalty_score": 1.0, "min_dec_len": 2, "is_unidirectional": 0, "task_prompt": "couplet" } rst = Couplet.create(**input_dict) #返回result return rst['result'] 五、在run.py中添加 ... from plugin.duilian import duilian #上面main()中getmsg后面 async def main() if '#上联' in msg and msg_type == '200': args = msg.split('上联')[-1] if args == '': pass#这里可以自行添加判断这里图方便我就没写 else: #print(args) result ='下联:'+duilian(args) #发送微信消息 await send_msg_group(result,from_wxid)

运行效果



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3